exchangis中添加不同数据源使用datax进行同步
新建数据源类
首先需要新建数据源类,用来将传入的datax配置转化为datax的json格式,下面以clickhouse为例
在exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/transform/handlers中新建ClickhouseDataxSubExchangisJobHandler.java文件
package com.webank.wedatasphere.exchangis.job.server.builder.transform.handlers;
import com.webank.wedatasphere.exchangis.datasource.core.utils.Json;
import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext;
import com.webank.wedatasphere.exchangis.job.domain.SubExchangisJob;
import com.webank.wedatasphere.exchangis.job.domain.params.JobParamDefine;
import com.webank.wedatasphere.exchangis.job.domain.params.JobParamSet;
import com.webank.wedatasphere.exchangis.job.domain.params.JobParams;
import com.webank.wedatasphere.exchangis.job.server.builder.JobParamConstraints;
import com.webank.wedatasphere.exchangis.job.server.utils.SQLCommandUtils;
import org.apache.linkis.common.exception.ErrorException;
import java.util.*;
import java.util.stream.Collectors;
/**
* Oracle in datax */public class ClickhouseDataxSubExchangisJobHandler extends AuthEnabledSubExchangisJobHandler {
/**
* Host */ private static final JobParamDefine<String> SOURCE_HOST = JobParams.define("connection[0].jdbcUrl[0].host", JobParamConstraints.HOST);
private static final JobParamDefine<String> SINK_HOST = JobParams.define("connection[0].jdbcUrl.host", JobParamConstraints.HOST);
/**
* Port */ private static final JobParamDefine<String> SOURCE_PORT = JobParams.define("connection[0].jdbcUrl[0].port", JobParamConstraints.PORT);
private static final JobParamDefine<String> SINK_PORT = JobParams.define("connection[0].jdbcUrl.port", JobParamConstraints.PORT);
/**
* ServiceName */ private static final JobParamDefine<String> SOURCE_DATABASE_NAME = JobParams.define("connection[0].jdbcUrl[0].database", JobParamConstraints.DATABASE);
private static final JobParamDefine<String> SINK_DATABASE_NAME = JobParams.define("connection[0].jdbcUrl.database", JobParamConstraints.DATABASE);
/**
* Table */ private static final JobParamDefine<String> SOURCE_TABLE = JobParams.define("table", JobParamConstraints.TABLE);
private static final JobParamDefine<String> SINK_TABLE = JobParams.define("connection[0].table[0]", JobParamConstraints.TABLE);
/**
* Connect params */ private static final JobParamDefine<Map<String, String>> SOURCE_PARAMS_MAP = JobParams.define("connection[0].jdbcUrl[0].connParams", JobParamConstraints.CONNECT_PARAMS,
connectParams -> Json.fromJson(connectParams, Map.class), String.class);
private static final JobParamDefine<Map<String, String>> SINK_PARAMS_MAP = JobParams.define("connection[0].jdbcUrl.connParams", JobParamConstraints.CONNECT_PARAMS,
connectParams -> Json.fromJson(connectParams, Map.class), String.class);
/**
* Where condition */ private static final JobParamDefine<String> SOURCE_WHERE_CONDITION = JobParams.define(JobParamConstraints.WHERE);
/**
* Query sql */ private static final JobParamDefine<String> QUERY_SQL = JobParams.define("connection[0].querySql[0]", job -> {
JobParamSet sourceParams = job.getRealmParams(SubExchangisJob.REALM_JOB_CONTENT_SOURCE);
String where = SOURCE_WHERE_CONDITION.getValue(sourceParams);
List<String> columns = job.getSourceColumnsgetName).collect(Collectors.toList();
if (columns.isEmpty()) {
columns.add("*");
}
return SQLCommandUtils.contactSql(Collections.singletonList(sourceParams
.get(JobParamConstraints.TABLE).getValue()), null, columns, null, where);
}, SubExchangisJob.class);
/**
* SQL column */ private static final JobParamDefine<List<String>> SQL_COLUMN = JobParams.define("column", job -> {
List<String> columns = job.getSinkColumnsgetName).collect(Collectors.toList();
if (columns.isEmpty()) {
columns.add("*");
}
return columns;
}, SubExchangisJob.class);
@Override
public void handleJobSource(SubExchangisJob subExchangisJob, ExchangisJobBuilderContext ctx) throws ErrorException {
JobParamSet paramSet = subExchangisJob.getRealmParams(SubExchangisJob.REALM_JOB_CONTENT_SOURCE);
if (Objects.nonNull(paramSet)) {
Arrays.asList(sourceMappings()).forEach(define -> paramSet.addNonNull(define.get(paramSet)));
paramSet.add(QUERY_SQL.newParam(subExchangisJob));
}
}
@Override
public void handleJobSink(SubExchangisJob subExchangisJob, ExchangisJobBuilderContext ctx) throws ErrorException {
JobParamSet paramSet = subExchangisJob.getRealmParams(SubExchangisJob.REALM_JOB_CONTENT_SINK);
if (Objects.nonNull(paramSet)) {
Arrays.asList(sinkMappings()).forEach(define -> paramSet.addNonNull(define.get(paramSet)));
paramSet.add(SQL_COLUMN.newParam(subExchangisJob));
}
}
@Override
public String dataSourceType() {
return "clickhouse";
}
@Override
public boolean acceptEngine(String engineType) {
return "datax".equalsIgnoreCase(engineType);
}
private JobParamDefine<?>[] sourceMappings() {
return new JobParamDefine[]{USERNAME, PASSWORD, SOURCE_TABLE, SOURCE_WHERE_CONDITION,
SOURCE_HOST, SOURCE_PORT, SOURCE_DATABASE_NAME, SOURCE_PARAMS_MAP};
}
public JobParamDefine<?>[] sinkMappings() {
return new JobParamDefine[]{USERNAME, PASSWORD, SINK_TABLE,
SINK_HOST, SINK_PORT, SINK_DATABASE_NAME, SINK_PARAMS_MAP};
}
}
其他数据库的配置也是类似,主要是改变
@Override
public String dataSourceType() {
return "clickhouse";
}
这一块内容改为其他数据库名称,至于有一些数据库不使用database,比如oracle、达梦,使用instance、serviceName等属性,需要将database转为serviceName
/**
* ServiceName
**/
private static final JobParamDefine<String> SOURCE_SERVICE_NAME = JobParams.define("connection[0].jdbcUrl[0].serviceName", JobParamConstraints.SERVICE_NAME);
private static final JobParamDefine<String> SINK_SERVICE_NAME = JobParams.define("connection[0].jdbcUrl.serviceName", JobParamConstraints.SERVICE_NAME);
在datax中添加构建jdbcUrl语句的功能
exchangis中,构建完datax的json格式后jdbcUrl的格式为LinkedMap格式,并不符合datax需要的string格式


因此在datax中,需要将上面的linkedMap拼装为下面的string语句
在exchangis-engines/engines/datax/datax-core/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/OriginalConfPretreatmentUtil.java中修改dealJdbcAndTable方法,原有的方法是对每一个数据库都有一个拼装语句,我们需要添加自己需要的Clickhouse的JdbcUrl语句

添加一个clickhouse的jdbcUrl拼装
else if(DATABASE_TYPE.equals(DataBaseType.ClickHouse)) {
List<Object> jdbcUrlObjects = connConf.getList(Key.JDBC_URL);
for (Object obj : jdbcUrlObjects) {
Map<String, Object> map = (Map<String, Object>) obj;
String parameter = "";
Map<String, Object> parameterMap = originalConfig.getMap(Key.CONNPARM, new HashMap<>());
for(String key : map.keySet()){
if (key.equals(Key.CONNPARM)){
parameterMap.putAll((Map<String, Object>) map.get(key));
}
}
parameter = parameterMap.entrySet().stream().map(
e->String.join("=", e.getKey(), String.valueOf(e.getValue()))
).collect(Collectors.joining("&"));
String jcUrl = Key.JDBCCLICKHOUSE + map.get(Key.HOST).toString() + ":" + map.get(Key.PORT).toString() + "/" + map.get(Key.DATABASE).toString();
if (!parameter.isEmpty()) {
jcUrl = Key.JDBCCLICKHOUSE + map.get(Key.HOST).toString() + ":" + map.get(Key.PORT).toString() + "/" + map.get(Key.DATABASE).toString() + "?" + parameter;
}
jdbcUrls.add(jcUrl);
}
同样的,需要在writer中也添加clickhouse的jdbcUrl拼装语句
在exchangis-engines/engines/datax/datax-core/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java中
else if(DATABASE_TYPE.equals(DataBaseType.ClickHouse)) {
Map<String, Object> map = connConf.getMap(com.alibaba.datax.plugin.rdbms.reader.Key.JDBC_URL);
String parameter = "";
Map<String, Object> parameterMap = originalConfig.getMap(com.alibaba.datax.plugin.rdbms.reader.Key.CONNPARM, new HashMap<>());
for (String key : map.keySet()) {
if (key.equals(com.alibaba.datax.plugin.rdbms.reader.Key.CONNPARM)) {
parameterMap.putAll((Map<String, Object>) map.get(key));
}
}
parameter = parameterMap.entrySet().stream().map(
e -> String.join("=", e.getKey(), String.valueOf(e.getValue()))
).collect(Collectors.joining(";"));
jdbcUrl = com.alibaba.datax.plugin.rdbms.reader.Key.JDBCCLICKHOUSE + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.HOST).toString() + ":" + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.PORT).toString() + "/" + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.DATABASE).toString();
if (!parameter.isEmpty()) {
jdbcUrl = com.alibaba.datax.plugin.rdbms.reader.Key.JDBCCLICKHOUSE + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.HOST).toString() + ":" + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.PORT).toString() + "/" + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.DATABASE).toString() + "?" + parameter;
}
同时,有些变量需要创建,在exchangis-engines/engines/datax/datax-core/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java创建一个JDBCCLICKHOUSE的jdbc前缀
public final static String JDBCCLICKHOUSE = "jdbc:clickhouse://";
修改exchangis-engines/engines/datax/datax-core/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java的Clickhouse的driverClassName

ClickHouse("clickhouse", "com.clickhouse.jdbc.ClickHouseDriver");